package com.bihua.iot.service.impl;

import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONPath;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.bihua.common.core.domain.PageQuery;
import com.bihua.common.core.page.TableDataInfo;
import com.bihua.common.utils.BeanCopyUtils;
import com.bihua.common.utils.StringUtils;
import com.bihua.common.utils.redis.RedisUtils;
import com.bihua.iot.constant.Constants;
import com.bihua.iot.domain.DeviceDatas;
import com.bihua.iot.domain.TdFields;
import com.bihua.iot.domain.TdSuperTable;
import com.bihua.iot.domain.TdTable;
import com.bihua.iot.domain.bo.DeviceDatasBo;
import com.bihua.iot.domain.bo.DeviceInfoBo;
import com.bihua.iot.domain.bo.ProductBo;
import com.bihua.iot.domain.bo.ProductServicesBo;
import com.bihua.iot.domain.vo.DeviceCommandsVo;
import com.bihua.iot.domain.vo.DeviceDatasVo;
import com.bihua.iot.domain.vo.DeviceInfoVo;
import com.bihua.iot.domain.vo.DeviceVo;
import com.bihua.iot.domain.vo.ProductServicesVo;
import com.bihua.iot.domain.vo.ProductVo;
import com.bihua.iot.enums.DeviceCommandStatus;
import com.bihua.iot.enums.DeviceConnectStatus;
import com.bihua.iot.enums.DeviceType;
import com.bihua.iot.enums.ProtocolType;
import com.bihua.iot.mapper.DeviceDatasMapper;
import com.bihua.iot.models.DeviceInfos;
import com.bihua.iot.models.TopoAddDatas;
import com.bihua.iot.service.DynamicExecuteService;
import com.bihua.iot.service.IDeviceCommandsService;
import com.bihua.iot.service.IDeviceDatasService;
import com.bihua.iot.service.IDeviceInfoService;
import com.bihua.iot.service.IDeviceService;
import com.bihua.iot.service.IProductService;
import com.bihua.iot.service.IProductServicesService;
import com.bihua.iot.service.MqttService;
import com.bihua.iot.service.TdEngineService;
import com.fasterxml.jackson.databind.ObjectMapper;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
 * 设备消息Service业务层处理
 *
 * @author bihua
 * @date 2023-06-16
 */
@RequiredArgsConstructor
@Slf4j
@Service
public class DeviceDatasServiceImpl implements IDeviceDatasService {

    private final DeviceDatasMapper baseMapper;
    private final MqttService mqttService;
    private final IDeviceService iDeviceService;
    private final IProductService iProductService;
    private final IProductServicesService iProductServicesService;
    private final TdEngineService tdEngineService;
    private final IDeviceInfoService iDeviceInfoService;
    private final IDeviceCommandsService iDeviceCommandsService;

    @Value("${spring.datasource.dynamic.datasource.td.dbName:bihua}")
    private String dataBaseName;
    /**
     * 查询设备消息
     */
    @Override
    public DeviceDatasVo queryById(Long id){
        return baseMapper.selectVoById(id);
    }

    /**
     * 查询设备消息列表
     */
    @Override
    public TableDataInfo<DeviceDatasVo> queryPageList(DeviceDatasBo bo, PageQuery pageQuery) {
        LambdaQueryWrapper<DeviceDatas> lqw = buildQueryWrapper(bo);
        Page<DeviceDatasVo> result = baseMapper.selectVoPage(pageQuery.build(), lqw);
        return TableDataInfo.build(result);
    }

    /**
     * 查询设备消息列表
     */
    @Override
    public List<DeviceDatasVo> queryList(DeviceDatasBo bo) {
        LambdaQueryWrapper<DeviceDatas> lqw = buildQueryWrapper(bo);
        return baseMapper.selectVoList(lqw);
    }

    private LambdaQueryWrapper<DeviceDatas> buildQueryWrapper(DeviceDatasBo bo) {
        Map<String, Object> params = bo.getParams();
        LambdaQueryWrapper<DeviceDatas> lqw = Wrappers.lambdaQuery();
        lqw.eq(StringUtils.isNotBlank(bo.getDeviceIdentification()), DeviceDatas::getDeviceIdentification, bo.getDeviceIdentification());
        lqw.eq(StringUtils.isNotBlank(bo.getMessageId()), DeviceDatas::getMessageId, bo.getMessageId());
        lqw.eq(StringUtils.isNotBlank(bo.getTopic()), DeviceDatas::getTopic, bo.getTopic());
        lqw.eq(StringUtils.isNotBlank(bo.getMessage()), DeviceDatas::getMessage, bo.getMessage());
        lqw.eq(StringUtils.isNotBlank(bo.getStatus()), DeviceDatas::getStatus, bo.getStatus());
        return lqw;
    }

    /**
     * 新增设备消息
     */
    @Override
    public Boolean insertByBo(DeviceDatasBo bo) {
        DeviceDatas add = BeanUtil.toBean(bo, DeviceDatas.class);
        validEntityBeforeSave(add);
        boolean flag = baseMapper.insert(add) > 0;
        if (flag) {
            bo.setId(add.getId());
        }
        return flag;
    }

    /**
     * 修改设备消息
     */
    @Override
    public Boolean updateByBo(DeviceDatasBo bo) {
        DeviceDatas update = BeanUtil.toBean(bo, DeviceDatas.class);
        validEntityBeforeSave(update);
        return baseMapper.updateById(update) > 0;
    }

    /**
     * 保存前的数据校验
     */
    private void validEntityBeforeSave(DeviceDatas entity){
        //TODO 做一些数据校验,如唯一约束
    }

    /**
     * 批量删除设备消息
     */
    @Override
    public Boolean deleteWithValidByIds(Collection<Long> ids, Boolean isValid) {
        if(isValid){
            //TODO 做一些业务上的校验,判断是否需要校验
        }
        return baseMapper.deleteBatchIds(ids) > 0;
    }

    /**
     * thinglinks-mqtt基础数据处理
     *
     * @param thinglinksMessage
     */
    @Async("linkAsync")
    @Override
    @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
    public void insertBaseDatas(JSONObject thinglinksMessage) throws Exception {
        String topic = thinglinksMessage.getString("topic");
        String qos = thinglinksMessage.getString("qos");
        String body = thinglinksMessage.getString("body");
        String time = thinglinksMessage.getString("time");
        if (!JSONUtil.isJson(body)) {
            log.error("Topic:{},The entry is empty and ignored", topic);
            return;
        }
        /**
         * Topic	                        Publisher（发布者）  Subscriber（订阅者）	用途
         * /v1/devices/{deviceId}/topo/add	边设备	物联网平台	边设备添加子设备
         * /v1/devices/{deviceId}/topo/addResponse	物联网平台	边设备	物联网平台返回的添加子设备的响应
         * /v1/devices/{deviceId}/topo/delete	边设备	物联网平台	边设备删除子设备
         * /v1/devices/{deviceId}/topo/deleteResponse	物联网平台	边设备	物联网平台返回的删除子设备的响应
         * /v1/devices/{deviceId}/topo/update	边设备	物联网平台	边设备更新子设备状态
         * /v1/devices/{deviceId}/topo/updateResponse	物联网平台	边设备	物联网平台返回的更新子设备状态的响应
         * /v1/devices/{deviceId}/datas	边设备	物联网平台	边设备上报数据
         * /v1/devices/{deviceId}/command	物联网平台	边设备	物联网平台给设备或边设备下发命令
         * /v1/devices/{deviceId}/commandResponse	边设备	物联网平台	边设备返回给物联网平台的命令响应
         */
        //边设备上报数据处理
        if (topic.startsWith("/v1/devices/") && topic.endsWith("/topo/add")) {
            final String deviceIdentification = StringUtils.substring(topic, 12, -9);
            final String payload = this.processingTopoAddTopic(deviceIdentification, body);
            final Map<String, Object> param = new HashMap<>();
            param.put("topic", topic.replace("add", "addResponse"));
            param.put("qos", Integer.valueOf(qos));
            param.put("retain", false);
            param.put("message", payload);
            mqttService.sendMessage(param);
        } else if (topic.startsWith("/v1/devices/") && topic.endsWith("/topo/delete")) {
            final String deviceIdentification = StringUtils.substring(topic, 12, -12);
            final String payload = this.processingTopoDeleteTopic(deviceIdentification, body);
            final Map<String, Object> param = new HashMap<>();
            param.put("topic", topic.replace("delete", "deleteResponse"));
            param.put("qos", Integer.valueOf(qos));
            param.put("retain", false);
            param.put("message", payload);
            mqttService.sendMessage(param);
        } else if (topic.startsWith("/v1/devices/") && topic.endsWith("/topo/update")) {
            final String deviceIdentification = StringUtils.substring(topic, 12, -12);
            final String payload = this.processingTopoUpdateTopic(deviceIdentification, body);
            final Map<String, Object> param = new HashMap<>();
            param.put("topic", topic.replace("update", "updateResponse"));
            param.put("qos", Integer.valueOf(qos));
            param.put("retain", false);
            param.put("message", payload);
            mqttService.sendMessage(param);
        } else if (topic.startsWith("/v1/devices/") && topic.endsWith("/datas")) {
            final String deviceIdentification = StringUtils.substring(topic, 12, -6);
            this.processingDatasTopic(deviceIdentification, body);
        } else if (topic.startsWith("/v1/devices/") && topic.endsWith("/commandResponse")) {
            final String deviceIdentification = StringUtils.substring(topic, 12, -16);
            this.processingTopoCommandResponseTopic(deviceIdentification, body);
        } else {
            //TODO 其他协议自行扩展
            log.info("Other Topic packets are ignored,Topic:{},Body:{},Time:{}", topic, body, time);
        }
    }
    /**
     * 处理/topo/add Topic边设备添加子设备
     *
     * @param deviceIdentification 设备标识
     * @param body                  数据
     */
    @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
    public String processingTopoAddTopic(String deviceIdentification, String body) throws Exception {
        final TopoAddDatas topoAddDatas = JSON.toJavaObject(JSON.parseObject(body), TopoAddDatas.class);
        Map<Object, Object> responseMaps = new HashMap<>();
        List<Map<String, Object>> dataList = new ArrayList();
        responseMaps.put("mid", topoAddDatas.getMid());
        responseMaps.put("statusCode", 0);
        responseMaps.put("statusDesc", "successful");
        responseMaps.put("data", dataList);
        final DeviceVo device = iDeviceService.findOneByDeviceIdentification(deviceIdentification);
        if (ObjectUtil.isNull(device)) {
            log.error("The side device reports data processing, but the device does not exist,DeviceIdentification:{},Body:{}", deviceIdentification, body);
            responseMaps.put("statusCode", 1);
            responseMaps.put("statusDesc", "The side device reports data processing, but the device does not exist.");
            return JSON.toJSONString(responseMaps);
        }
        ProductBo bo = new ProductBo();
        bo.setProductIdentification(device.getProductIdentification());
        bo.setProtocolType(device.getProtocolType());
        List<ProductVo> products = iProductService.queryList(bo);
        if (CollectionUtil.isEmpty(products)) {
            log.error("The side device reports data processing, but the product does not exist,DeviceIdentification:{},Body:{}", deviceIdentification, body);
            responseMaps.put("statusCode", 1);
            responseMaps.put("statusDesc", "The side device reports data processing, but the product does not exist.");
            return JSON.toJSONString(responseMaps);
        }
        for (DeviceInfos deviceInfos : topoAddDatas.getDeviceInfos()) {
            final DeviceInfoBo deviceInfo = new DeviceInfoBo();
            deviceInfo.setDid(device.getId());
            deviceInfo.setNodeId(deviceInfos.getNodeId());
            deviceInfo.setNodeName(deviceInfos.getName());
            deviceInfo.setDeviceId(UUID.randomUUID().toString(true));
            deviceInfo.setDescription(deviceInfos.getDescription());
            deviceInfo.setModel(deviceInfos.getModel());
            deviceInfo.setConnectStatus(DeviceConnectStatus.INIT.getValue());
            deviceInfo.setShadowEnable(true);
            StringBuilder shadowTableNameBuilder = new StringBuilder();
            // 新增设备管理成功后，创建TD普通表
            ProductServicesBo servicesBo = new ProductServicesBo();
            servicesBo.setProductIdentification(device.getProductIdentification());
            servicesBo.setStatus("0");
            List<ProductServicesVo> allByProductIdAndStatus = iProductServicesService.queryList(servicesBo);
            TdTable tableDto;
            for (ProductServicesVo productServices : allByProductIdAndStatus) {
                tableDto = new TdTable();
                tableDto.setDataBaseName(dataBaseName);
                //超级表命名规则 : 产品类型_产品标识_服务名称
                String superTableName = products.get(0).getProductType() + "_" + productServices.getProductIdentification() + "_" + productServices.getServiceName();
                tableDto.setSuperTableName(superTableName);
                //子表命名规则 : 产品类型_产品标识_服务名称_设备标识（设备唯一标识）
                tableDto.setTableName(superTableName + "_" +deviceInfo.getDeviceId());
                //Tag的处理
                List<TdFields> tagsFieldValues = new ArrayList<>();
                TdFields fields = new TdFields();
                fields.setFieldValue(device.getDeviceIdentification());
                tagsFieldValues.add(fields);
                tableDto.setTagsFieldValues(tagsFieldValues);
                try {
                    tdEngineService.createTable(tableDto);
                    shadowTableNameBuilder.append(tableDto.getTableName()).append(",");
                    log.info("Create SuperTable Success: " + tableDto.getTableName());
                } catch (Exception e) {
                    log.error("Create SuperTable Exception: " + e.getMessage());
                }
            }
            if (shadowTableNameBuilder.length() > 0) {
                deviceInfo.setShadowTableName(shadowTableNameBuilder.substring(0, shadowTableNameBuilder.length() - 1));
            }
            shadowTableNameBuilder.replace(0, shadowTableNameBuilder.length(), "");
            deviceInfo.setCreateBy(device.getCreateBy());
            Boolean insertFlag = iDeviceInfoService.insertByBo(deviceInfo);
            Map responseMap = new HashMap<>();
            if (insertFlag) {
                responseMap.put("statusCode", 0);
                responseMap.put("statusDesc", "successful");
            } else {
                responseMap.put("statusCode", 1);
                responseMap.put("statusDesc", "abortive");
                log.error("Insert DeviceInfo Exception,DeviceIdentification:{},Body:{}", deviceIdentification, body);
            }
            Map<Object, Object> deviceInfoMap = new HashMap<>();
            deviceInfoMap.put("deviceId", deviceInfo.getDeviceId());
            deviceInfoMap.put("nodeId", deviceInfo.getNodeId());
            deviceInfoMap.put("name", deviceInfo.getNodeName());
            deviceInfoMap.put("description", deviceInfo.getDescription());
            deviceInfoMap.put("model", deviceInfo.getModel());
            responseMap.put("deviceInfo", deviceInfoMap);
            dataList.add(responseMap);
        }
        return JSON.toJSONString(responseMaps);
    }

    /**
     * 处理/topo/delete Topic边设备删除子设备
     *
     * @param deviceIdentification 设备标识
     * @param body                  数据
     */
    @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
    public String processingTopoDeleteTopic(String deviceIdentification, String body) throws Exception {
        Map<Object, Object> responseMaps = new HashMap<>();
        List<Map<String, Object>> dataList = new ArrayList();

        responseMaps.put("mid", JSONPath.read(body, "$.mid").toString());
        responseMaps.put("statusCode", 0);
        responseMaps.put("statusDesc", "successful");
        responseMaps.put("data", dataList);
        final List<String> deviceIds = (List<String>)JSONPath.read(body, "$.deviceIds[*]");
        if (CollectionUtil.isEmpty(deviceIds)) {
            log.error("The side device reports data processing, but the deviceId does not exist,DeviceIdentification:{},Body:{}", deviceIdentification, body);
            responseMaps.put("statusCode", 1);
            responseMaps.put("statusDesc", "The side device reports data processing, but the deviceId does not exist.");
            return JSON.toJSONString(responseMaps);
        }
        final int deleteByDeviceIdCount = iDeviceInfoService.deleteByDeviceIds(deviceIds);
        for (String deviceId : deviceIds) {
            Map responseMap = new HashMap<>();
            if (deleteByDeviceIdCount > 0) {
                responseMap.put("statusCode", 0);
                responseMap.put("statusDesc", "successful");
            } else {
                responseMap.put("statusCode", 1);
                responseMap.put("statusDesc", "abortive");
                log.error("Delete DeviceInfo Exception");
            }
            responseMap.put("deviceId", deviceId);
            dataList.add(responseMap);
        }
        return JSON.toJSONString(responseMaps);
    }

    /**
     * 处理/topo/update Topic边设备更新子设备状态
     *
     * @param deviceIdentification 设备标识
     * @param body                  数据
     */
    @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
    public String processingTopoUpdateTopic(String deviceIdentification, String body) throws Exception {
        JSONObject message = JSON.parseObject(body);
        Map<Object, Object> responseMaps = new HashMap<>();
        List<Map<String, Object>> dataList = new ArrayList();
        responseMaps.put("mid", JSONPath.read(body, "$.mid").toString());
        responseMaps.put("statusCode", 0);
        responseMaps.put("statusDesc", "successful");
        responseMaps.put("data", dataList);
        JSONArray jsonArray = message.getJSONArray("deviceStatuses");
        if (ObjectUtil.isNull(jsonArray)) {
            log.error("The side device reports data processing, but the deviceStatus does not exist,DeviceIdentification:{},Body:{}", deviceIdentification, body);
            responseMaps.put("statusCode", 1);
            responseMaps.put("statusDesc", "The side device reports data processing, but the deviceStatus does not exist.");
            return JSON.toJSONString(responseMaps);
        }
        jsonArray.stream().forEach(json -> {
            Map responseMap = new HashMap<>();
            Map deviceStatusMap = (Map) json;
            final String deviceId = deviceStatusMap.get("deviceId").toString();
            final String status = deviceStatusMap.get("status").toString();
            final DeviceInfoVo deviceInfo = iDeviceInfoService.findOneByDeviceId(deviceId);
            if (ObjectUtil.isNull(deviceInfo)) {
                log.error("The side device reports data processing, but the device does not exist,DeviceIdentification:{},Body:{}", deviceIdentification, body);
            }
            if ("ONLINE".equals(status)) {
                deviceInfo.setConnectStatus(DeviceConnectStatus.ONLINE.getValue());
            } else if ("OFFLINE".equals(status)) {
                deviceInfo.setConnectStatus(DeviceConnectStatus.OFFLINE.getValue());
            }
            final Boolean updateByPrimaryKeySelectiveCount = iDeviceInfoService.updateByBo(BeanCopyUtils.copy(deviceInfo, DeviceInfoBo.class));
            if (updateByPrimaryKeySelectiveCount) {
                responseMap.put("statusCode", 0);
                responseMap.put("statusDesc", "successful");
            } else {
                responseMap.put("statusCode", 1);
                responseMap.put("statusDesc", "abortive");
                log.error("Update DeviceInfo Exception");
            }
            responseMap.put("deviceId", deviceId);
            dataList.add(responseMap);

        });
        return JSON.toJSONString(responseMaps);
    }

    /**
     * 处理datas Topic数据上报
     *
     * @param deviceIdentification 设备标识
     * @param body                  数据
     */
    @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
    public void processingDatasTopic(String deviceIdentification, String body) throws Exception {
        DeviceVo device = null;
        if (Boolean.TRUE.equals(RedisUtils.hasKey(Constants.DEVICE_RECORD_KEY + deviceIdentification))) {
            device = RedisUtils.getCacheObject(Constants.DEVICE_RECORD_KEY + deviceIdentification);
        } else {
            device = iDeviceService.findOneByDeviceIdentification(deviceIdentification);
            if (ObjectUtil.isNotNull(device)) {
                RedisUtils.setCacheObject(Constants.DEVICE_RECORD_KEY + deviceIdentification, device);
            } else {
                log.error("The side device reports data processing, but the device does not exist,DeviceIdentification:{},Body:{}", deviceIdentification, body);
                return;
            }
        }
        ProductBo bo = new ProductBo();
        bo.setProductIdentification(device.getProductIdentification());
        List<ProductVo> products = iProductService.queryList(bo);
        if(CollectionUtil.isEmpty(products)){
            log.error("The device reports data processing, but the product of the device does not exist,DeviceIdentification:{},Body:{}", deviceIdentification, body);
            return;
        }
        //协议脚本转换处理
        body = convertToBody(deviceIdentification, body);
        //根据返回的json解析出上报的数据data，所属的服务serviceName，事件发生的时间eventTime
        Map<String, Object> resultMap =JSONUtil.toBean(body, Map.class);
        List<Map<String, Object>> items = (List<Map<String, Object>>) resultMap.get("devices");
        for (Map<String, Object> item : items) {
            final Object deviceId = item.get("deviceId");
            String manufacturerId = "";
            String model = "";
            if (device.getDeviceType().equals(DeviceType.GATEWAY.getValue())) {
                final DeviceInfoVo oneByDeviceId = iDeviceInfoService.findOneByDeviceId(deviceId.toString());
                if (ObjectUtil.isNull(oneByDeviceId)) {
                    log.error("The side device reports data processing, but the device does not exist,DeviceIdentification:{},Body:{}", deviceIdentification, body);
                    continue;
                }
            } else if (device.getDeviceType().equals(DeviceType.COMMON.getValue())) {
                if(!StringUtils.equalsIgnoreCase(deviceIdentification, (String)deviceId)){
                    log.error("The conmon device reports data processing, but the device does not exist,DeviceIdentification:{},Body:{}", deviceIdentification, body);
                    continue;
                }
            }
            final JSONArray services = JSON.parseArray(item.get("services").toString());
            //如果设备上报的数据为空，不需要存，跳过该循环，进入下个循环
            if (services.isEmpty()) {
                log.error("The side device reports data processing, but the data does not exist,DeviceIdentification:{},Body:{}", deviceIdentification, body);
                continue;
            }
            for (Object service : services) {
                final JSONObject serviceData = JSON.parseObject(service.toString());
                final Object serviceId = serviceData.get("serviceId");
                final Object eventTime = serviceData.get("eventTime");
                Map<String, Object> data = JSONUtil.toBean(serviceData.get("data").toString(), Map.class);
                //超级表命名规则 : 产品类型_产品标识_服务名称
                String superTableName = products.get(0).getProductType() + "_" + products.get(0).getProductIdentification() + "_" + serviceId.toString();
                //子表命名规则 : 产品类型_产品标识_服务名称_设备标识（设备唯一标识）
                String tableName = superTableName + "_" + deviceId.toString();
                //从redis根据超级表名称取出超级表表结构信息
                final Object cacheObject = RedisUtils.getCacheObject(Constants.TDENGINE_SUPERTABLEFILELDS + superTableName);
                ObjectMapper objectMapper = new ObjectMapper();
                TdSuperTable superTableDto = objectMapper.convertValue(cacheObject, TdSuperTable.class);
                //获取超级表的表结构信息
                final List<TdFields> schemaFields = superTableDto.getSchemaFields();
                //如果表结构信息为空，跳过该循环，进入下个循环
                if (schemaFields == null) {
                    continue;
                }
                //取出事件发生时间，并格式化为long类型的毫秒时间戳
                long eventDataTime = DateUtil.parseLocalDateTime(eventTime.toString(), "yyyyMMddHHmmss").toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
                //超级表第一个字段数据类型必须为时间戳,默认Ts为当前系统时间
                schemaFields.get(0).setFieldValue(DateUtil.current());
                //因为超级表的第二个字段为事件发生时间数据类型必须为时间戳，所以直接将索引为1的字段信息对象的字段值设置为eventTime
                schemaFields.get(1).setFieldValue(eventDataTime);
                //循环设置字段值
                for (TdFields schemaField : schemaFields) {
                    //根据字段名称获取data里的数据（字段名称为服务的属性，数据上报的格式就是根据属性来的）
                    Object fieldValue = data.get(schemaField.getFieldName());
                    //获取的值不为空才给该字段赋值
                    if (fieldValue != null) {
                        //如果字段为字符串类型，字段值的大小不超过该列宽，才给该字段赋值（如果超过列宽，sql会报错）
                        if (schemaField.getSize() != null && "nchar".equals(schemaField.getDataType())) {
                            if (fieldValue.toString().length() <= schemaField.getSize()) {
                                schemaField.setFieldValue(fieldValue);
                            }
                        } else {
                            //其他数据类型，直接赋值
                            schemaField.setFieldValue(fieldValue);
                        }
                    }
                }
                //获取超级表的标签信息
                final List<TdFields> tagsFields = superTableDto.getTagsFields();
                //循环设置字段值
                for (TdFields tagsField : tagsFields) {
                    //根据业务逻辑，将超级表的标签字段定为
                    // 1:设备标识：deviceIdentification
                    Object fieldValue = deviceId;
                    //获取的值不为空才给该字段赋值
                    if (fieldValue != null) {
                        //如果字段为字符串类型，字段值的大小不超过该列宽，才给该字段赋值（如果超过列宽，sql会报错）
                        if (tagsField.getSize() != null && "nchar".equals(tagsField.getDataType())) {
                            if (fieldValue.toString().length() <= tagsField.getSize()) {
                                tagsField.setFieldValue(fieldValue);
                            }
                        } else {
                            //其他数据类型，直接赋值
                            tagsField.setFieldValue(fieldValue);
                        }
                    }
                }
                //字段信息对象集合通过stream流过滤掉没有字段值的字段对象
                List<TdFields> schemaFieldsStream = schemaFields.stream().filter(fields -> fields.getFieldValue() != null).collect(Collectors.toList());
                //字段信息对象集合通过stream流过滤掉没有字段值的字段对象
                List<TdFields> tagsFieldsStream = tagsFields.stream().filter(fields -> fields.getFieldValue() != null).collect(Collectors.toList());
                //如果字段值只有第一个字段的时间戳，说明上报的数据没有符合该服务的属性，不做保存操作，跳过该循环，进入下个循环
                if (schemaFieldsStream.size() == 1) {
                    continue;
                }
                //设置插入所需参数
                TdTable tableDto = new TdTable();
                tableDto.setDataBaseName(superTableDto.getDataBaseName());
                tableDto.setSuperTableName(superTableDto.getSuperTableName());
                tableDto.setTableName(tableName);
                tableDto.setSchemaFieldValues(schemaFieldsStream);
                tableDto.setTagsFieldValues(tagsFieldsStream);
                //调用插入方法插入数据 TODO 需要改为mq异步处理
                try {
                    tdEngineService.insertData(tableDto);
                    log.info("DeviceIdentification: {}, Insert data result: {}", deviceIdentification, "OK");
                }catch (Exception e){
                    log.error("DeviceIdentification: {}, Insert data Exception: {}", deviceIdentification, e.getMessage());
                }
            }
        }
    }

    /**
     * 处理/commandResponse Topic边设备返回给物联网平台的命令响应
     *
     * @param deviceIdentification 设备标识
     * @param body                  数据
     */
    @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
    public void processingTopoCommandResponseTopic(String deviceIdentification, String body) throws Exception {
        cn.hutool.json.JSONObject jsonObject = JSONUtil.parseObj(body);
        if(ObjectUtil.isEmpty(jsonObject.get("mid"))){
            log.error("The side device reports data processing, but the mid field is not included in the body,DeviceIdentification:{},Body:{}", deviceIdentification, body);
            return;
        }
        Long mid = Long.valueOf(String.valueOf(jsonObject.get("mid")));
        DeviceCommandsVo vo = iDeviceCommandsService.queryById(mid);
        if(ObjectUtil.isNull(vo)){
            log.error("The side device reports data processing, but the device command does not exist,DeviceIdentification:{},Body:{}", deviceIdentification, body);
            return;
        }
        if(!StringUtils.equalsIgnoreCase(deviceIdentification, vo.getDeviceIdentification())){
            log.error("The side device reports data processing, but the device identification of the device command is inconsistent,DeviceIdentification:{},Body:{}", deviceIdentification, body);
            return;
        }
        if(!StringUtils.equalsIgnoreCase(vo.getStatus(), DeviceCommandStatus.WAIT.getKey())){
            log.error("The side device reports data processing, but status of the device command is not WAIT, DeviceIdentification:{},Body:{}", deviceIdentification, body);
            return;
        }
        int errcode = (int)jsonObject.get("errcode");
        if(errcode == 0){
            iDeviceCommandsService.updateCommandResponseById(body, DeviceCommandStatus.SUCCESS.getKey(), mid);
        }else{
            iDeviceCommandsService.updateCommandResponseById(body, DeviceCommandStatus.FAILURE.getKey(), mid);
        }

    }

    /**
     * 协议转换处理
     * 根据设备找到所属产品 产品的服务及属性 转换出系统能识别的json 找到这个产品的协议内容即Java代码
     */
    public String convertToBody(String deviceIdentification, String body) {
        if (Boolean.TRUE.equals(RedisUtils.hasKey(Constants.DEVICE_DATA_REPORTED_AGREEMENT_SCRIPT + ProtocolType.MQTT.getValue() + deviceIdentification))) {
            return DynamicExecuteService.executeDynamically(deviceIdentification, body);
        } else {
            return body;
        }
    }
}
